from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName("BD Project")\
.getOrCreate()
#pip install nbformat
#pip install plotly
#pip install folium
#pip install seaborn
from pyspark.sql.functions import *
#Importing Plotly for Plotting Data
import plotly.graph_objects as go
# Imports for plotting Map
import numpy as np
import seaborn as sns
from folium import plugins
%matplotlib inline
import folium
import matplotlib.pyplot as plt
plt.style.use('seaborn')
#Loading Rat Sightings CSV Data
df_main = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema","true") \
.load("project_datasets/Rat_Sightings.csv")
df_main.printSchema()
df_location = df_main.select('Unique Key','Incident Zip','City','Address Type','Location Type','Borough')
df_borough = df_location.groupBy("Borough").count()
#Filtering Null Rows
df_borough = df_borough.na.drop(subset=["Borough"])
df_borough = df_borough.filter(df_borough.Borough!="Unspecified")
#Collecting data in lists
Borough = df_borough.select(collect_list('Borough')).first()[0]
BoroughCount = df_borough.select(collect_list('count')).first()[0]
fig = go.Figure(
data=[go.Pie(labels = Borough , values = BoroughCount)],
layout_title_text="Borough vs Count of Rat Sightings"
)
fig.show()
df_apartment_type = df_location.groupBy("Location Type").count()
ApartmentType = df_apartment_type.select(collect_list('Location Type')).first()[0]
ApartmentTypeCount = df_apartment_type.select(collect_list('count')).first()[0]
fig1 = go.Figure(
data=[go.Bar(x = ApartmentType , y = ApartmentTypeCount)],
layout_title_text="Apartment Type vs Count of Rat Sightings"
)
fig1.show()
city_data = df_main.groupBy("City").count()
city_data = city_data.sort(col('count').desc())
CityData = city_data.select(collect_list('City')).first()[0]
CityCount = city_data.select(collect_list('count')).first()[0]
fig3 = go.Figure(
data=[go.Bar(x = CityData , y = CityCount)],
layout_title_text="City vs Count of Rat Sightings"
)
fig3.show()
df_month = df_main.select('Unique Key','Created Date')
df_month=df_month.withColumn("Month",df_month["Created Date"].substr(0,2))
monthsDict = { "01" : "January", "02": "February", "03": "March", "04": "April", "05": "May", "06": "June", "07": "July",
"08" : "August", "09": "September", "10": "October", "11": "November", "12": "December"}
df_month = df_month.groupBy("Month").count()
df_month = df_month.sort(df_month.Month.asc())
months = df_month.select(collect_list('Month')).first()[0]
sightingsCount = df_month.select(collect_list('count')).first()[0]
months = [monthsDict[x] for x in months]
fig4 = go.Figure([go.Bar(x=months, y=sightingsCount, marker_color='crimson')],
layout_title_text="Month vs Count of Rat Sightings")
fig4.update_xaxes(title_text="Months")
fig4.update_yaxes(title_text="No. of Rats Sightings")
fig4.show()
SeasonDict = {
"Winter": ["December", "January", "February"],
"Spring": ["March", "April", "May"],
"Summer": ["June", "July", "August"],
"Autumn": ["September", "October", "November"]
}
Seasoncount = []
seasons = []
for season in SeasonDict:
seasons.append(season)
count = 0
for month in SeasonDict[season]:
idx = months.index(month)
count += sightingsCount[idx]
Seasoncount.append(count)
fig5 = go.Figure([go.Bar(x=seasons, y=Seasoncount, marker_color='blue')],
layout_title_text="Seasons vs Count of Rat Sightings")
fig5.update_xaxes(title_text="Seasons")
fig5.update_yaxes(title_text="No. of Rats Sightings")
fig5.show()
fig6 = go.Figure(
data=[go.Pie(labels = seasons , values = Seasoncount)],
layout_title_text="Season vs Count of Rat Sightings"
)
fig6.show()
df_year = df_main.select('Unique Key','Created Date')
df_year = df_year.withColumn("Year",df_year["Created Date"].substr(0,10).substr(7,8))
df_year = df_year.groupBy("Year").count()
df_year = df_year.sort(df_year.Year.asc())
years = df_year.select(collect_list('Year')).first()[0]
sightingsCountPerYear = df_year.select(collect_list('count')).first()[0]
fig7 = go.Figure([go.Scatter(x=years, y=sightingsCountPerYear, marker_color='green')],
layout_title_text="Year vs Count of Rat Sightings")
fig7.update_xaxes(title_text="Year")
fig7.update_yaxes(title_text="No. of Rats Sightings")
fig7.show()
df_subway = spark.read\
.option("header","true")\
.option("inferSchema","true")\
.option("delimiter",',')\
.csv('project_datasets/NYC_Transit_Subway_Entrance_And_Exit_Data.csv')
df_subway.printSchema()
df1_station1=df_subway.withColumn("Station Name",concat(col('Station Name'),lit(' '),col('Line')))
df1_station=df1_station1.select('Station Name','Station Location').distinct()
df1_lat_long = df1_station1.select('Entrance Latitude','Entrance Longitude').distinct()
df1_station.orderBy('Station Name').show()
from pyspark.sql import functions as F
df3=df_main.where(df_main.Location!="").select('Unique Key','Location')
df3.printSchema()
df4=df3.withColumn('Location',regexp_replace(col("Location"), "\(*.\)", ""))
df5=df4.withColumn('Location',regexp_replace(col("Location"), "\(", ""))
rat_sightings = df5.select('Unique Key','Location').withColumn('Location', split(col('Location'),',').cast('array<double>'))
rat_sightings.cache()
df1_pre=df1_station.withColumn('Station Location',regexp_replace(col("Station Location"), "\(*.\)", ""))
df1_post=df1_pre.withColumn('Station Location',regexp_replace(col("Station Location"), "\(", ""))
subway_station = df1_post.select('Station Name','Station Location').withColumn('Station Location', split(col('Station Location'),',').cast('array<double>'))
subway_station.show(5,False)
subway_station.printSchema()
subway_station.cache()
#generating folium map
map_subway = folium.Map([40.7, -73.9], zoom_start=11)
map_rats = folium.Map([40.7, -73.9], zoom_start=11)
df_2=df_main.where(df_main.Location!="").select('Latitude','Longitude')
df2_lat=df_2.withColumn('Latitude',df_2['Latitude'].cast("float").alias('Latitude'))
df2_long=df2_lat.withColumn('Longitude',df2_lat['Longitude'].cast("float").alias('Longitude'))
df_loc=df_main.where(df_main.Location!="").select('Location')
df1_latlong_temp1=df1_lat_long.withColumn('Entrance Latitude',df1_lat_long['Entrance Latitude'].cast("float").alias('Entrance Latitude'))
df1_latlong_temp2=df1_latlong_temp1.withColumn('Entrance Longitude',df1_latlong_temp1['Entrance Longitude'].cast("float").alias('Entrance Longitude'))
locs=np.array(df1_latlong_temp2.select('Entrance Latitude','Entrance Longitude').collect())
rats=np.array(df2_long.select('Latitude','Longitude').collect())
plugins.MarkerCluster(locs).add_to(map_subway)
map_subway.add_children(plugins.HeatMap(rats, radius=15))
map_rats.add_children(plugins.HeatMap(rats, radius=15))
display(map_rats)
display(map_subway)